package com.ithuameng.cosy.acceptor;

import com.ithuameng.cosy.acceptor.config.WebsocketConfig;
import com.ithuameng.cosy.coder.json.TextMessageDecoder;
import com.ithuameng.cosy.coder.json.TextMessageEncoder;
import com.ithuameng.cosy.coder.protobuf.WebMessageDecoder;
import com.ithuameng.cosy.coder.protobuf.WebMessageEncoder;
import com.ithuameng.cosy.constant.WebsocketProtocol;
import com.ithuameng.cosy.handler.IllegalRequestHandler;
import com.ithuameng.cosy.handshake.HandshakeHandler;
import com.ithuameng.cosy.util.SSLContextUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.util.concurrent.TimeUnit;

/**
 * websocket协议端口
 *
 * @author ithuameng
 */
@ChannelHandler.Sharable
public class WebsocketAcceptor extends NioSocketAcceptor {

    private static final String JSON_BANNER = "*\n\n\nWebsocket Server started on port {} for [JSON] mode.\n\n";

    private static final String PROTOBUF_BANNER = "*\n\n\nWebsocket Server started on port {} for [protobuf] mode.\n\n";

    private final WebsocketConfig config;

    private final HandshakeHandler handshakeHandler;

    private final ChannelHandler illegalRequestHandler = new IllegalRequestHandler();

    public WebsocketAcceptor(WebsocketConfig config) {
        super(config.getOuterRequestHandler());
        this.config = config;
        this.handshakeHandler = new HandshakeHandler(config.getHandshakePredicate());
    }

    public void bind() {

        if (!config.isEnable()) {
            return;
        }

        ServerBootstrap bootstrap = createServerBootstrap();
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                // SSL处理器
                if (config.isSslEnable()) {
                    SSLContext sslContext = SSLContextUtil.createSSLContext(config.getSslType(), config.getSslPath(), config.getSslPassword());
                    SSLEngine sslEngine = sslContext.createSSLEngine();
                    sslEngine.setNeedClientAuth(false);
                    sslEngine.setUseClientMode(false);
                    ch.pipeline().addFirst("ssl", new SslHandler(sslEngine));
                }
                // 解析HTTP GET请求
                // 在http协议中netty使用HttpServerCodec，进行编解码，主要这个包含编码和界面，内部继承了两个类一个编码类，一个解码类
                ch.pipeline().addLast(new HttpServerCodec());
                // ChunkedWriteHandler主要作用是以块方式写数据
                ch.pipeline().addLast(new ChunkedWriteHandler());
                // 解析HTTP POST请求
                // HttpObjectAggregator主要作用当浏览器发送大量数据时，就会发出多次http请求，就是为了让其聚合处理消息
                ch.pipeline().addLast(new HttpObjectAggregator(4 * 1024));
                // WebSocketServerProtocolHandler主要作用，就是将其返回的状态码变成101，将其http协议升级为ws（webSocket）协议
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(config.getPath(), true));
                // WS握手时鉴权
                ch.pipeline().addLast(handshakeHandler);
                if (config.getProtocol() == WebsocketProtocol.JSON) {
                    // JSON协议编解码
                    ch.pipeline().addLast(new TextMessageDecoder());
                    ch.pipeline().addLast(new TextMessageEncoder());
                } else {
                    // PROTOBUF协议编解码
                    ch.pipeline().addLast(new WebMessageDecoder());
                    ch.pipeline().addLast(new WebMessageEncoder());
                }
                // 心跳检测机制
                ch.pipeline().addLast(new IdleStateHandler(readIdle.getSeconds(), writeIdle.getSeconds(), 0, TimeUnit.SECONDS));
                // 日志记录处理Handler
                ch.pipeline().addLast(loggingHandler);
                // Websocket核心处理Handler
                ch.pipeline().addLast(WebsocketAcceptor.this);
                // 非法请求处理Handler
                ch.pipeline().addLast(illegalRequestHandler);
            }
        });
        ChannelFuture channelFuture = bootstrap.bind(config.getPort()).syncUninterruptibly();
        channelFuture.channel().newSucceededFuture().addListener(future -> {
            if (config.getProtocol() == WebsocketProtocol.JSON) {
                logger.info(JSON_BANNER, config.getPort());
            }
            if (config.getProtocol() == WebsocketProtocol.PROTOBUF) {
                logger.info(PROTOBUF_BANNER, config.getPort());
            }
        });
        channelFuture.channel().closeFuture().addListener(future -> this.destroy());
    }
}
